Thread Versus Coroutine: A Comparison


In [2]:
import threading
import itertools
import time
import sys

class Signal:
    go = True
    
def spin(msg, signal):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        time.sleep(.1)
        if not signal.go:
            break
    write(' ' * len(status) + '\x08' * len(status))
    
def slow_function():
    # pretend waiting a long time for I/O
    time.sleep(3)
    return 42

def supervisor():
    signal = Signal()
    spinner = threading.Thread(target = spin,
                               args = ('thinking!', signal))
    print('spinner object:', spinner)
    spinner.start()
    result = slow_function()
    signal.go = False
    spinner.join()
    return result

def main():
    result = supervisor()
    print('Answer:', result)
    
if __name__ == '__main__':
    main()


spinner object: <Thread(Thread-6, initial)>
| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!           Answer: 42

In [ ]:
import asyncio
import itertools
import sys

@asyncio.coroutine
def spin(msg):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        try:
            yield from asyncio.sleep(.1)
        except asyncio.CancelledError:
                break
    write(' ' * len(status) + '\x08' * len(status))
    
@asyncio.coroutine
def slow_function():
    # pretend waiting for a long time for I/O
    yield from asyncio.sleep(3)
    return 42

@asyncio.coroutine
def supervisor():
    spinner = asyncio.async(spin('thinking!'))
    print('spinner object:', spinner)
    result = yield from slow_function()
    spinner.cancel()
    return result
    
def main():
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(supervisor())
    loop.close()
    print('Answer:', result)
    
if __name__ == '__main__':
    main()

Downloading with asyncio and aiohttp


In [1]:
import asyncio

import aiohttp

from flags import BASE_URL, save_flag, show, main

@asyncio.coroutine
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    image = yield from resp.read()
    return image

@asyncio.coroutine
def download_one(cc):
    image = yield from get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

def download_many(cc_list):
    loop = asyncio.get_event_loop()
    to_do = [download_one(cc) for cc in sorted(cc_list)]
    wait_coro = asyncio.wait(to_do)
    res, _ = loop.run_until_complete(wait_coro)
    loop.close()
    
    return len(res)

if __name__ == '__main__':
    main(download_many)


BD ID RU CN IN TR VN JP FR DE NG BR ET PH US MX PK IR CD EG 
20 flags downloaded in 1.28s

Enhancing the asyncio downloader Script

Using asyncio.as_completed


In [ ]:
import asyncio
import collections

import aiohttp
from aiohttp import web
import tqdm

from flags2_common import main, HTTPStatus, Result, save_flag

# default set low to avoid errors from remote site, such as
# 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000

class FetchError(Exception):                                   #1
    def __init__(self, country_code):
        self.country_code = country_code
        
@asyncio.coroutine
def get_flag(base_url, cc):                                    #2
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    if resp.status == 200:
        image = yield from resp.read()
        return image
    elif resp.status == 404:
        raise web.HTTPNotFound()
    else:
        raise aiohttp.HttpProcessingError(
            code = resp.status, message = resp.reason,
            headers = resp.headers)
        
@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):            #3
    try:
        with (yield from semaphore):                           #4
            image = yield from get_flag(base_url, cc)          #5
    except web.HTTPNotFound:                                   #6
        status = HTTPStatus.not_found
        msg = 'not found'
    except Exception as exc:
        raise FetchError(cc) from exc                          #7
    else:
        save_flag(image, cc.lower() + '.gif')                  #8
        status = HTTPStatus.ok
        msg = 'OK'
        
    if verbose and msg:
        print(cc, msg)
        
    return Result(Status, cc)

@asyncio.coroutine
def downloader_coro(cc_list, base_url, verbose, concur_req):   #1
    counter = collections.Counter()
    semaphore = asyncio.Semaphore(concur_req)                  #2
    to_do = [download_one(cc, base_url, semaphore, verbose)
            for cc in sorted(cc_list)]                         #3
    
    to_do_iter = asyncio.as_completed(to_do)                   #4
    if not verbose:
        to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) #5
    for future in to_do_iter:                                  #6
        try:
            res = yield from future                            #7
        except FetchError as exc:                              #8
            country_code = exc.country_code                    #9
            try:
                error_msg = exc.__cause__.args[0]              #10
            except IndexError:
                error_msg = exc.__cause__.__class__.__name__   #11
            if verbose and error_msg:
                msg = '*** Error for {}: {}'
                print(msg.format(country_code, error_msg))
            status = HTTPStatus.error
        else:
            status = res.status
            
        counter[status] += 1                                   #12
        
    return counter                                             #13

def download_many(cc_list, base_url, verbose, concur_req):
    loop = asyncio.get_event_loop()
    coro = downloader_coro(cc_list, base_url, verbose, concur_req)
    counts = loop.run_until_complete(coro)                     #14
    loop.close()                                               #15
    
    return counts
    
if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

Using an Executor to Avoid Blocking the Event Loop


In [2]:
@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
    try:
        with (yield from semaphore):
            image = yield from get_flag(base_url, cc)
    except web.HTTPNotFound:
        status = HTTPStatus.not_found
        msg = 'not found'
    except Exception as exc:
        raise FetchError(cc) from exc
    else:
        loop = asyncio.get_event_loop()                             #1
        loop.run_in_executor(None,                                  #2
                            save_flag, image, cc.lower() + '.gif')  #3
        status = HTTPStatus.ok
        msg = 'OK'
        
    if verbose and msg:
        print(cc, msg)
        
    return Result(Status, cc)

From Callbacks to Futures and Coroutines

Doing Multiple Requests for Each Download


In [ ]:
@asyncio.coroutine
def http_get(url):
    res = yield from aiohttp.request('GET', url)
    if res.status == 200:
        ctype = res.headers.get('Content-type', '').lower()
        if 'json' in ctype or url.endswith('json'):
            data = yield from res.json()                           #1
        else:
            data = yield from res.read()                           #2
        return data
    
    elif res.status == 404:
        raise web.HTTPNotFound()
    else:
        raise aiohttp.errors.HttpProcessingError(
            code=res.status, message=res.reason,
            headers=res.headers)
        
@asyncio.coroutine
def get_country(base_url, cc):
    url = '{}/{cc}/metadata.json'.format(base_url, cc=cc.lower())
    metadata = yield from http_get(url)                            #3
    return metadata['country']

@asyncio.coroutine
def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    return (yield from http_get(url))                              #4

@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
    try:
        with (yield from semaphore):                               #5
            image = yield from get_flag(base_url, cc)
        with (yield from semaphore):
            country = yield from get_country(base_url, cc)
    except web.HTTPNotFound:
        status = HTTPStatus.not_found
        msg = 'not found'
    except Exception as exc:
        raise FetchError(cc) from exc
    else:
        country = country.replace(' ', '_')
        filename = '{}.{}.gif'.format(country, cc)
        loop = asyncio.get_event_loop()
        loop.run_in_executor(None, save_flag, image, filename)
        status = HTTPStatus.ok
        msg = 'OK'
        
    if verbose and msg:
        print(cc, msg)
        
    return Result(status, cc)

Writing asyncio Servers

An asyncio TCP Server


In [ ]:
import sys
import asyncio

from charfinder import UnicodeNameIndex                #1

CRLF = b'\r\n'
PROMPT = b'?>'

index = UnicodeNameIndex()                             #2

@asyncio.coroutine
def handle_queries(reader, writer):                    #3
    while True:                                        #4
        writer.write(PROMPT)                           #5 can't yield from!
        yield from writer.drain()                      #6 must yield from!
        data = yield from reader.readline()            #7
        try:
            query = data.decode().strip()
        except UnicodeDecodeError:                     #8
            query = '\x00'
        client = writer.get_extra_info('peername')     #9
        print('Received from {}: {!r}'.format(client, query))  #10
        if query:
            if ord(query[:1]) < 32:                    #11
                break
            lines = list(index.find_description_strs(query))   #12
            if lines:
                writer.writelines(line.encode() + CRLF for line in lines)  #13
            writer.write(index.status(query, len(lines)).encode() + CRLF)  #14
            
            yield from writer.drain()
            print('Sent {} results'.format(len(lines))) #16
            
    print('Close the client socket')                    #17
    writer.close()
    
def main(address='127.0.0.1', port=2323):               #1
    port = int(port)
    loop = asyncio.get_event_loop()
    server_coro = asyncio.start_server(handle_queries, address, port,
                                      loop=loop)        #2
    server = loop.run_until_complete(server_coro)       #3
    
    host = server.sockets[0].getsockname()              #4
    print('Serving on {}. Hit CTRL-C to stop.'.format(host))  #5
    try:
        loop.run_forever()                              #6
    except KeyboardInterrupt:  # CTRL+C pressed
        pass
    
    print('Server shutting down.')
    server.close()                                      #7
    loop.run_until_complete(server.wait_closed())       #8
    loop.close()                                        #9
    
if __name__ == '__main__':
    main(*sys.argv[1:])                                 #10

An aiohttp Web Server


In [ ]:
import asyncio

def home(request):                                            #1
    query = request.GET.get('query', '').strip()              #2
    print('Query: {!r}'.format(query))                        #3
    if query:                                                 #4
        descriptions = list(index.find_descriptions(query))
        res = '\n'.join(ROW_TPL.format(**vars(descr))
                       for descr in descriptions)
        msg = index.status(query, len(descriptions))
    else:
        descriptions = []
        res = ''
        msg = 'Enter words describing characters.'
        
    html = template.format(query=query, result=res,           #5
                          message = msg)
    print('Sending {} results'.format(len(descriptions)))     #6
    return web.Response(content_type=CONTENT_TYPE, text=html) #7

@asyncio.coroutine
def init(loop, address, port):                               #1
    app = web.Application(loop=loop)                         #2
    app.router.add_route('GET', '/', home)                   #3
    handler = app.make_handler()                             #4
    server = yield from loop.create_server(handler,
                                          address, port)     #5
    return server.sockets[0].getsockname()                   #6

def main(address="127.0.0.1", port=8888):
    port = int(port)
    loop = asyncio.get_event_loop()
    host = loop.run_until_complete(init(loop, address, port)) #7
    print('Serving on {}, Hit CTRL-C to stop.'.format(host))
    try:
        loop.run_forever()                                    #8
    except KeyboardInterrupt:
        pass
    print('Server shutting down.')
    loop.close()                                              #9
    
if __name__ == '__main__':
    main(*sys.argv[1:])